package com.zzz.CustomeInputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> {
    //分片
    private FileSplit fileSplit;
    //Configuration
    private Configuration configuration;
    //当前recordReader读取的value
    private BytesWritable bytesWritable;
    //reader是否已经读取分片数据；默认为false,没有读
    private boolean flag = false;


    //初始化方法，主要对类的成员做初始化
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        this.fileSplit = (FileSplit)inputSplit;
        this.configuration = taskAttemptContext.getConfiguration();
        this.bytesWritable = new BytesWritable();
    }

    //判断是否有下一个k,v对，第一次来读切片的时候，有下一个key，因此可以读取切片数据
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!flag){
            //开始读取分片的数据
            long length = fileSplit.getLength();  //分片长度
            byte[] bytes = new byte[(int) length];  //建立一个分片长度的数组，存储分片内容

            Path path = fileSplit.getPath();
            FileSystem fileSystem = path.getFileSystem(configuration);

            FSDataInputStream open = fileSystem.open(path); //调用open方法获取输入流

            IOUtils.readFully(open,bytes,0,(int) length); //使用工具类将输入流数据copy到字节数组中去

            bytesWritable.set(bytes,0, (int) length); //把字节数组数据序列化为BytesWritable

            flag = true;
            return true; //表示第一次读可以读取k，v
        }
        return false; //表示第二次读时，没有可以读取的k，v对，因为就一个分片，一次就可以读完
    }

    //当前key
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }
    //当前value
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return bytesWritable;
    }
    //当前读取进度，没读就是0%，读了就是100%
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return flag ? 1.0f : 0.0f;
    }

    //各类资源的关闭
    @Override
    public void close() throws IOException {

    }
}
